home *** CD-ROM | disk | FTP | other *** search
/ Isometric Game Programming with DirectX 7.0 / Isometric Game Programming.iso / directx / dxf / samples / multimedia / directshow / baseclasses / outputq.cpp < prev    next >
Encoding:
C/C++ Source or Header  |  2000-10-02  |  23.4 KB  |  794 lines

  1. //------------------------------------------------------------------------------
  2. // File: OutputQ.cpp
  3. //
  4. // Desc: DirectShow base classes - implements COutputQueue class used by an
  5. //       output pin which may sometimes want to queue output samples on a
  6. //       separate thread and sometimes call Receive() directly on the input
  7. //       pin.
  8. //
  9. // Copyright (c) 1992 - 2000, Microsoft Corporation.  All rights reserved.
  10. //------------------------------------------------------------------------------
  11.  
  12.  
  13. #include <streams.h>
  14.  
  15.  
  16. //
  17. //  COutputQueue Constructor :
  18. //
  19. //  Determines if a thread is to be created and creates resources
  20. //
  21. //     pInputPin  - the downstream input pin we're queueing samples to
  22. //
  23. //     phr        - changed to a failure code if this function fails
  24. //                  (otherwise unchanges)
  25. //
  26. //     bAuto      - Ask pInputPin if it can block in Receive by calling
  27. //                  its ReceiveCanBlock method and create a thread if
  28. //                  it can block, otherwise not.
  29. //
  30. //     bQueue     - if bAuto == FALSE then we create a thread if and only
  31. //                  if bQueue == TRUE
  32. //
  33. //     lBatchSize - work in batches of lBatchSize
  34. //
  35. //     bBatchEact - Use exact batch sizes so don't send until the
  36. //                  batch is full or SendAnyway() is called
  37. //
  38. //     lListSize  - If we create a thread make the list of samples queued
  39. //                  to the thread have this size cache
  40. //
  41. //     dwPriority - If we create a thread set its priority to this
  42. //
  43. COutputQueue::COutputQueue(
  44.              IPin         *pInputPin,          //  Pin to send stuff to
  45.              HRESULT      *phr,                //  'Return code'
  46.              BOOL          bAuto,              //  Ask pin if queue or not
  47.              BOOL          bQueue,             //  Send through queue
  48.              LONG          lBatchSize,         //  Batch
  49.              BOOL          bBatchExact,        //  Batch exactly to BatchSize
  50.              LONG          lListSize,
  51.              DWORD         dwPriority,
  52.              bool          bFlushingOpt        // flushing optimization
  53.             ) : m_lBatchSize(lBatchSize),
  54.                 m_bBatchExact(bBatchExact && (lBatchSize > 1)),
  55.                 m_hThread(NULL),
  56.                 m_hSem(NULL),
  57.                 m_List(NULL),
  58.                 m_pPin(pInputPin),
  59.                 m_ppSamples(NULL),
  60.                 m_lWaiting(0),
  61.                 m_pInputPin(NULL),
  62.                 m_bSendAnyway(FALSE),
  63.                 m_nBatched(0),
  64.                 m_bFlushing(FALSE),
  65.                 m_bFlushed(TRUE),
  66.                 m_bFlushingOpt(bFlushingOpt),
  67.                 m_bTerminate(FALSE),
  68.                 m_hEventPop(NULL),
  69.                 m_hr(S_OK)
  70. {
  71.     ASSERT(m_lBatchSize > 0);
  72.  
  73.  
  74.     if (FAILED(*phr)) {
  75.         return;
  76.     }
  77.  
  78.     //  Check the input pin is OK and cache its IMemInputPin interface
  79.  
  80.     *phr = pInputPin->QueryInterface(IID_IMemInputPin, (void **)&m_pInputPin);
  81.     if (FAILED(*phr)) {
  82.         return;
  83.     }
  84.  
  85.     // See if we should ask the downstream pin
  86.  
  87.     if (bAuto) {
  88.         HRESULT hr = m_pInputPin->ReceiveCanBlock();
  89.         if (SUCCEEDED(hr)) {
  90.             bQueue = hr == S_OK;
  91.         }
  92.     }
  93.  
  94.     //  Create our sample batch
  95.  
  96.     m_ppSamples = new PMEDIASAMPLE[m_lBatchSize];
  97.     if (m_ppSamples == NULL) {
  98.         *phr = E_OUTOFMEMORY;
  99.         return;
  100.     }
  101.  
  102.     //  If we're queueing allocate resources
  103.  
  104.     if (bQueue) {
  105.         DbgLog((LOG_TRACE, 2, TEXT("Creating thread for output pin")));
  106.         m_hSem = CreateSemaphore(NULL, 0, 0x7FFFFFFF, NULL);
  107.         if (m_hSem == NULL) {
  108.             DWORD dwError = GetLastError();
  109.             *phr = AmHresultFromWin32(dwError);
  110.             return;
  111.         }
  112.         m_List = new CSampleList(NAME("Sample Queue List"),
  113.                                  lListSize,
  114.                                  FALSE         // No lock
  115.                                 );
  116.         if (m_List == NULL) {
  117.             *phr = E_OUTOFMEMORY;
  118.             return;
  119.         }
  120.  
  121.  
  122.         DWORD dwThreadId;
  123.         m_hThread = CreateThread(NULL,
  124.                                  0,
  125.                                  InitialThreadProc,
  126.                                  (LPVOID)this,
  127.                                  0,
  128.                                  &dwThreadId);
  129.         if (m_hThread == NULL) {
  130.             DWORD dwError = GetLastError();
  131.             *phr = AmHresultFromWin32(dwError);
  132.             return;
  133.         }
  134.         SetThreadPriority(m_hThread, dwPriority);
  135.     } else {
  136.         DbgLog((LOG_TRACE, 2, TEXT("Calling input pin directly - no thread")));
  137.     }
  138. }
  139.  
  140. //
  141. //  COutputQueuee Destructor :
  142. //
  143. //  Free all resources -
  144. //
  145. //      Thread,
  146. //      Batched samples
  147. //
  148. COutputQueue::~COutputQueue()
  149. {
  150.     DbgLog((LOG_TRACE, 3, TEXT("COutputQueue::~COutputQueue")));
  151.     /*  Free our pointer */
  152.     if (m_pInputPin != NULL) {
  153.         m_pInputPin->Release();
  154.     }
  155.     if (m_hThread != NULL) {
  156.         {
  157.             CAutoLock lck(this);
  158.             m_bTerminate = TRUE;
  159.             m_hr = S_FALSE;
  160.             NotifyThread();
  161.         }
  162.         DbgWaitForSingleObject(m_hThread);
  163.         EXECUTE_ASSERT(CloseHandle(m_hThread));
  164.  
  165.         //  The thread frees the samples when asked to terminate
  166.  
  167.         ASSERT(m_List->GetCount() == 0);
  168.         delete m_List;
  169.     } else {
  170.         FreeSamples();
  171.     }
  172.     if (m_hSem != NULL) {
  173.         EXECUTE_ASSERT(CloseHandle(m_hSem));
  174.     }
  175.     delete [] m_ppSamples;
  176. }
  177.  
  178. //
  179. //  Call the real thread proc as a member function
  180. //
  181. DWORD WINAPI COutputQueue::InitialThreadProc(LPVOID pv)
  182. {
  183.     HRESULT hrCoInit = CAMThread::CoInitializeHelper();
  184.     
  185.     COutputQueue *pSampleQueue = (COutputQueue *)pv;
  186.     DWORD dwReturn = pSampleQueue->ThreadProc();
  187.  
  188.     if(hrCoInit == S_OK) {
  189.         CoUninitialize();
  190.     }
  191.     
  192.     return dwReturn;
  193. }
  194.  
  195. //
  196. //  Thread sending the samples downstream :
  197. //
  198. //  When there is nothing to do the thread sets m_lWaiting (while
  199. //  holding the critical section) and then waits for m_hSem to be
  200. //  set (not holding the critical section)
  201. //
  202. DWORD COutputQueue::ThreadProc()
  203. {
  204.     while (TRUE) {
  205.         BOOL          bWait = FALSE;
  206.         IMediaSample *pSample;
  207.         LONG          lNumberToSend; // Local copy
  208.         NewSegmentPacket* ppacket;
  209.  
  210.         //
  211.         //  Get a batch of samples and send it if possible
  212.         //  In any case exit the loop if there is a control action
  213.         //  requested
  214.         //
  215.         {
  216.             CAutoLock lck(this);
  217.             while (TRUE) {
  218.  
  219.                 if (m_bTerminate) {
  220.                     FreeSamples();
  221.                     return 0;
  222.                 }
  223.                 if (m_bFlushing) {
  224.                     FreeSamples();
  225.                     SetEvent(m_evFlushComplete);
  226.                 }
  227.  
  228.                 //  Get a sample off the list
  229.  
  230.                 pSample = m_List->RemoveHead();
  231.         // inform derived class we took something off the queue
  232.         if (m_hEventPop) {
  233.                     //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  234.             SetEvent(m_hEventPop);
  235.         }
  236.  
  237.                 if (pSample != NULL &&
  238.                     !IsSpecialSample(pSample)) {
  239.  
  240.                     //  If its just a regular sample just add it to the batch
  241.                     //  and exit the loop if the batch is full
  242.  
  243.                     m_ppSamples[m_nBatched++] = pSample;
  244.                     if (m_nBatched == m_lBatchSize) {
  245.                         break;
  246.                     }
  247.                 } else {
  248.  
  249.                     //  If there was nothing in the queue and there's nothing
  250.                     //  to send (either because there's nothing or the batch
  251.                     //  isn't full) then prepare to wait
  252.  
  253.                     if (pSample == NULL &&
  254.                         (m_bBatchExact || m_nBatched == 0)) {
  255.  
  256.                         //  Tell other thread to set the event when there's
  257.                         //  something do to
  258.  
  259.                         ASSERT(m_lWaiting == 0);
  260.                         m_lWaiting++;
  261.                         bWait      = TRUE;
  262.                     } else {
  263.  
  264.                         //  We break out of the loop on SEND_PACKET unless
  265.                         //  there's nothing to send
  266.  
  267.                         if (pSample == SEND_PACKET && m_nBatched == 0) {
  268.                             continue;
  269.                         }
  270.  
  271.                         if (pSample == NEW_SEGMENT) {
  272.                             // now we need the parameters - we are
  273.                             // guaranteed that the next packet contains them
  274.                             ppacket = (NewSegmentPacket *) m_List->RemoveHead();
  275.                 // we took something off the queue
  276.                 if (m_hEventPop) {
  277.                                 //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  278.                         SetEvent(m_hEventPop);
  279.                 }
  280.  
  281.                             ASSERT(ppacket);
  282.                         }
  283.                         //  EOS_PACKET falls through here and we exit the loop
  284.                         //  In this way it acts like SEND_PACKET
  285.                     }
  286.                     break;
  287.                 }
  288.             }
  289.             if (!bWait) {
  290.                 // We look at m_nBatched from the client side so keep
  291.                 // it up to date inside the critical section
  292.                 lNumberToSend = m_nBatched;  // Local copy
  293.                 m_nBatched = 0;
  294.             }
  295.         }
  296.  
  297.         //  Wait for some more data
  298.  
  299.         if (bWait) {
  300.             DbgWaitForSingleObject(m_hSem);
  301.             continue;
  302.         }
  303.  
  304.  
  305.  
  306.         //  OK - send it if there's anything to send
  307.         //  We DON'T check m_bBatchExact here because either we've got
  308.         //  a full batch or we dropped through because we got
  309.         //  SEND_PACKET or EOS_PACKET - both of which imply we should
  310.         //  flush our batch
  311.  
  312.         if (lNumberToSend != 0) {
  313.             long nProcessed;
  314.             if (m_hr == S_OK) {
  315.                 ASSERT(!m_bFlushed);
  316.                 HRESULT hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
  317.                                                           lNumberToSend,
  318.                                                           &nProcessed);
  319.                 /*  Don't overwrite a flushing state HRESULT */
  320.                 CAutoLock lck(this);
  321.                 if (m_hr == S_OK) {
  322.                     m_hr = hr;
  323.                 }
  324.                 ASSERT(!m_bFlushed);
  325.             }
  326.             while (lNumberToSend != 0) {
  327.                 m_ppSamples[--lNumberToSend]->Release();
  328.             }
  329.             if (m_hr != S_OK) {
  330.  
  331.                 //  In any case wait for more data - S_OK just
  332.                 //  means there wasn't an error
  333.  
  334.                 DbgLog((LOG_ERROR, 2, TEXT("ReceiveMultiple returned %8.8X"),
  335.                        m_hr));
  336.             }
  337.         }
  338.  
  339.         //  Check for end of stream
  340.  
  341.         if (pSample == EOS_PACKET) {
  342.  
  343.             //  We don't send even end of stream on if we've previously
  344.             //  returned something other than S_OK
  345.             //  This is because in that case the pin which returned
  346.             //  something other than S_OK should have either sent
  347.             //  EndOfStream() or notified the filter graph
  348.  
  349.             if (m_hr == S_OK) {
  350.                 DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
  351.                 HRESULT hr = m_pPin->EndOfStream();
  352.                 if (FAILED(hr)) {
  353.                     DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
  354.                 }
  355.             }
  356.         }
  357.  
  358.         //  Data from a new source
  359.  
  360.         if (pSample == RESET_PACKET) {
  361.             m_hr = S_OK;
  362.             SetEvent(m_evFlushComplete);
  363.         }
  364.  
  365.         if (pSample == NEW_SEGMENT) {
  366.             m_pPin->NewSegment(ppacket->tStart, ppacket->tStop, ppacket->dRate);
  367.             delete ppacket;
  368.         }
  369.     }
  370. }
  371.  
  372. //  Send batched stuff anyway
  373. void COutputQueue::SendAnyway()
  374. {
  375.     if (!IsQueued()) {
  376.  
  377.         //  m_bSendAnyway is a private parameter checked in ReceiveMultiple
  378.  
  379.         m_bSendAnyway = TRUE;
  380.         LONG nProcessed;
  381.         ReceiveMultiple(NULL, 0, &nProcessed);
  382.         m_bSendAnyway = FALSE;
  383.  
  384.     } else {
  385.         CAutoLock lck(this);
  386.         QueueSample(SEND_PACKET);
  387.         NotifyThread();
  388.     }
  389. }
  390.  
  391. void
  392. COutputQueue::NewSegment(
  393.     REFERENCE_TIME tStart,
  394.     REFERENCE_TIME tStop,
  395.     double dRate)
  396. {
  397.     if (!IsQueued()) {
  398.         if (S_OK == m_hr) {
  399.             if (m_bBatchExact) {
  400.                 SendAnyway();
  401.             }
  402.             m_pPin->NewSegment(tStart, tStop, dRate);
  403.         }
  404.     } else {
  405.         if (m_hr == S_OK) {
  406.             //
  407.             // we need to queue the new segment to appear in order in the
  408.             // data, but we need to pass parameters to it. Rather than
  409.             // take the hit of wrapping every single sample so we can tell
  410.             // special ones apart, we queue special pointers to indicate
  411.             // special packets, and we guarantee (by holding the
  412.             // critical section) that the packet immediately following a
  413.             // NEW_SEGMENT value is a NewSegmentPacket containing the
  414.             // parameters.
  415.             NewSegmentPacket * ppack = new NewSegmentPacket;
  416.             if (ppack == NULL) {
  417.                 return;
  418.             }
  419.             ppack->tStart = tStart;
  420.             ppack->tStop = tStop;
  421.             ppack->dRate = dRate;
  422.  
  423.             CAutoLock lck(this);
  424.             QueueSample(NEW_SEGMENT);
  425.             QueueSample( (IMediaSample*) ppack);
  426.             NotifyThread();
  427.         }
  428.     }
  429. }
  430.  
  431.  
  432. //
  433. //  End of Stream is queued to output device
  434. //
  435. void COutputQueue::EOS()
  436. {
  437.     CAutoLock lck(this);
  438.     if (!IsQueued()) {
  439.         if (m_bBatchExact) {
  440.             SendAnyway();
  441.         }
  442.         if (m_hr == S_OK) {
  443.             DbgLog((LOG_TRACE, 2, TEXT("COutputQueue sending EndOfStream()")));
  444.             m_bFlushed = FALSE;
  445.             HRESULT hr = m_pPin->EndOfStream();
  446.             if (FAILED(hr)) {
  447.                 DbgLog((LOG_ERROR, 2, TEXT("COutputQueue got code 0x%8.8X from EndOfStream()")));
  448.             }
  449.         }
  450.     } else {
  451.         if (m_hr == S_OK) {
  452.             m_bFlushed = FALSE;
  453.             QueueSample(EOS_PACKET);
  454.             NotifyThread();
  455.         }
  456.     }
  457. }
  458.  
  459. //
  460. //  Flush all the samples in the queue
  461. //
  462. void COutputQueue::BeginFlush()
  463. {
  464.     if (IsQueued()) {
  465.         {
  466.             CAutoLock lck(this);
  467.  
  468.             // block receives -- we assume this is done by the
  469.             // filter in which we are a component
  470.  
  471.             // discard all queued data
  472.  
  473.             m_bFlushing = TRUE;
  474.  
  475.             //  Make sure we discard all samples from now on
  476.  
  477.             if (m_hr == S_OK) {
  478.                 m_hr = S_FALSE;
  479.             }
  480.  
  481.             // Optimize so we don't keep calling downstream all the time
  482.  
  483.             if (m_bFlushed && m_bFlushingOpt) {
  484.                 return;
  485.             }
  486.  
  487.             // Make sure we really wait for the flush to complete
  488.             m_evFlushComplete.Reset();
  489.  
  490.             NotifyThread();
  491.         }
  492.  
  493.         // pass this downstream
  494.  
  495.         m_pPin->BeginFlush();
  496.     } else {
  497.         // pass downstream first to avoid deadlocks
  498.         m_pPin->BeginFlush();
  499.         CAutoLock lck(this);
  500.         // discard all queued data
  501.  
  502.         m_bFlushing = TRUE;
  503.  
  504.         //  Make sure we discard all samples from now on
  505.  
  506.         if (m_hr == S_OK) {
  507.             m_hr = S_FALSE;
  508.         }
  509.     }
  510.  
  511. }
  512.  
  513. //
  514. // leave flush mode - pass this downstream
  515. void COutputQueue::EndFlush()
  516. {
  517.     {
  518.         CAutoLock lck(this);
  519.         ASSERT(m_bFlushing);
  520.         if (m_bFlushingOpt && m_bFlushed && IsQueued()) {
  521.             m_bFlushing = FALSE;
  522.             m_hr = S_OK;
  523.             return;
  524.         }
  525.     }
  526.  
  527.     // sync with pushing thread -- done in BeginFlush
  528.     // ensure no more data to go downstream -- done in BeginFlush
  529.     //
  530.     // Because we are synching here there is no need to hold the critical
  531.     // section (in fact we'd deadlock if we did!)
  532.  
  533.     if (IsQueued()) {
  534.         m_evFlushComplete.Wait();
  535.     } else {
  536.         FreeSamples();
  537.     }
  538.  
  539.     //  Be daring - the caller has guaranteed no samples will arrive
  540.     //  before EndFlush() returns
  541.  
  542.     m_bFlushing = FALSE;
  543.     m_bFlushed  = TRUE;
  544.  
  545.     // call EndFlush on downstream pins
  546.  
  547.     m_pPin->EndFlush();
  548.  
  549.     m_hr = S_OK;
  550. }
  551.  
  552. //  COutputQueue::QueueSample
  553. //
  554. //  private method to Send a sample to the output queue
  555. //  The critical section MUST be held when this is called
  556.  
  557. void COutputQueue::QueueSample(IMediaSample *pSample)
  558. {
  559.     if (NULL == m_List->AddTail(pSample)) {
  560.         if (!IsSpecialSample(pSample)) {
  561.             pSample->Release();
  562.         }
  563.     }
  564. }
  565.  
  566. //
  567. //  COutputQueue::Receive()
  568. //
  569. //  Send a single sample by the multiple sample route
  570. //  (NOTE - this could be optimized if necessary)
  571. //
  572. //  On return the sample will have been Release()'d
  573. //
  574.  
  575. HRESULT COutputQueue::Receive(IMediaSample *pSample)
  576. {
  577.     LONG nProcessed;
  578.     return ReceiveMultiple(&pSample, 1, &nProcessed);
  579. }
  580.  
  581. //
  582. //  COutputQueue::ReceiveMultiple()
  583. //
  584. //  Send a set of samples to the downstream pin
  585. //
  586. //      ppSamples           - array of samples
  587. //      nSamples            - how many
  588. //      nSamplesProcessed   - How many were processed
  589. //
  590. //  On return all samples will have been Release()'d
  591. //
  592.  
  593. HRESULT COutputQueue::ReceiveMultiple (
  594.     IMediaSample **ppSamples,
  595.     long nSamples,
  596.     long *nSamplesProcessed)
  597. {
  598.     CAutoLock lck(this);
  599.     //  Either call directly or queue up the samples
  600.  
  601.     if (!IsQueued()) {
  602.  
  603.         //  If we already had a bad return code then just return
  604.  
  605.         if (S_OK != m_hr) {
  606.  
  607.             //  If we've never received anything since the last Flush()
  608.             //  and the sticky return code is not S_OK we must be
  609.             //  flushing
  610.             //  ((!A || B) is equivalent to A implies B)
  611.             ASSERT(!m_bFlushed || m_bFlushing);
  612.  
  613.             //  We're supposed to Release() them anyway!
  614.             *nSamplesProcessed = 0;
  615.             for (int i = 0; i < nSamples; i++) {
  616.                 DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (direct) : Discarding %d samples code 0x%8.8X"),
  617.                         nSamples, m_hr));
  618.                 ppSamples[i]->Release();
  619.             }
  620.  
  621.             return m_hr;
  622.         }
  623.         //
  624.         //  If we're flushing the sticky return code should be S_FALSE
  625.         //
  626.         ASSERT(!m_bFlushing);
  627.         m_bFlushed = FALSE;
  628.  
  629.         ASSERT(m_nBatched < m_lBatchSize);
  630.         ASSERT(m_nBatched == 0 || m_bBatchExact);
  631.  
  632.         //  Loop processing the samples in batches
  633.  
  634.         LONG iLost = 0;
  635.         for (long iDone = 0;
  636.              iDone < nSamples || (m_nBatched != 0 && m_bSendAnyway);
  637.             ) {
  638.  
  639. //pragma message (REMIND("Implement threshold scheme"))
  640.             ASSERT(m_nBatched < m_lBatchSize);
  641.             if (iDone < nSamples) {
  642.                 m_ppSamples[m_nBatched++] = ppSamples[iDone++];
  643.             }
  644.             if (m_nBatched == m_lBatchSize ||
  645.                 nSamples == 0 && (m_bSendAnyway || !m_bBatchExact)) {
  646.                 LONG nDone;
  647.                 DbgLog((LOG_TRACE, 4, TEXT("Batching %d samples"),
  648.                        m_nBatched));
  649.  
  650.                 if (m_hr == S_OK) {
  651.                     m_hr = m_pInputPin->ReceiveMultiple(m_ppSamples,
  652.                                                         m_nBatched,
  653.                                                         &nDone);
  654.                 } else {
  655.                     nDone = 0;
  656.                 }
  657.                 iLost += m_nBatched - nDone;
  658.                 for (LONG i = 0; i < m_nBatched; i++) {
  659.                     m_ppSamples[i]->Release();
  660.                 }
  661.                 m_nBatched = 0;
  662.             }
  663.         }
  664.         *nSamplesProcessed = iDone - iLost;
  665.         if (*nSamplesProcessed < 0) {
  666.             *nSamplesProcessed = 0;
  667.         }
  668.         return m_hr;
  669.     } else {
  670.         /*  We're sending to our thread */
  671.  
  672.         if (m_hr != S_OK) {
  673.             *nSamplesProcessed = 0;
  674.             DbgLog((LOG_TRACE, 3, TEXT("COutputQueue (queued) : Discarding %d samples code 0x%8.8X"),
  675.                     nSamples, m_hr));
  676.             for (int i = 0; i < nSamples; i++) {
  677.                 ppSamples[i]->Release();
  678.             }
  679.             return m_hr;
  680.         }
  681.         m_bFlushed = FALSE;
  682.         for (long i = 0; i < nSamples; i++) {
  683.             QueueSample(ppSamples[i]);
  684.         }
  685.         *nSamplesProcessed = nSamples;
  686.         if (!m_bBatchExact ||
  687.             m_nBatched + m_List->GetCount() >= m_lBatchSize) {
  688.             NotifyThread();
  689.         }
  690.         return S_OK;
  691.     }
  692. }
  693.  
  694. //  Get ready for new data - cancels sticky m_hr
  695. void COutputQueue::Reset()
  696. {
  697.     if (!IsQueued()) {
  698.         m_hr = S_OK;
  699.     } else {
  700.         CAutoLock lck(this);
  701.         QueueSample(RESET_PACKET);
  702.         NotifyThread();
  703.         m_evFlushComplete.Wait();
  704.     }
  705. }
  706.  
  707. //  Remove and Release() all queued and Batched samples
  708. void COutputQueue::FreeSamples()
  709. {
  710.     CAutoLock lck(this);
  711.     if (IsQueued()) {
  712.         while (TRUE) {
  713.             IMediaSample *pSample = m_List->RemoveHead();
  714.         // inform derived class we took something off the queue
  715.         if (m_hEventPop) {
  716.                 //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  717.             SetEvent(m_hEventPop);
  718.         }
  719.  
  720.             if (pSample == NULL) {
  721.                 break;
  722.             }
  723.             if (!IsSpecialSample(pSample)) {
  724.                 pSample->Release();
  725.             } else {
  726.                 if (pSample == NEW_SEGMENT) {
  727.                     //  Free NEW_SEGMENT packet
  728.                     NewSegmentPacket *ppacket =
  729.                         (NewSegmentPacket *) m_List->RemoveHead();
  730.             // inform derived class we took something off the queue
  731.             if (m_hEventPop) {
  732.                         //DbgLog((LOG_TRACE,3,TEXT("Queue: Delivered  SET EVENT")));
  733.                 SetEvent(m_hEventPop);
  734.             }
  735.  
  736.                     ASSERT(ppacket != NULL);
  737.                     delete ppacket;
  738.                 }
  739.             }
  740.         }
  741.     }
  742.     for (int i = 0; i < m_nBatched; i++) {
  743.         m_ppSamples[i]->Release();
  744.     }
  745.     m_nBatched = 0;
  746. }
  747.  
  748. //  Notify the thread if there is something to do
  749. //
  750. //  The critical section MUST be held when this is called
  751. void COutputQueue::NotifyThread()
  752. {
  753.     //  Optimize - no need to signal if it's not waiting
  754.     ASSERT(IsQueued());
  755.     if (m_lWaiting) {
  756.         ReleaseSemaphore(m_hSem, m_lWaiting, NULL);
  757.         m_lWaiting = 0;
  758.     }
  759. }
  760.  
  761. //  See if there's any work to do
  762. //  Returns
  763. //      TRUE  if there is nothing on the queue and nothing in the batch
  764. //            and all data has been sent
  765. //      FALSE otherwise
  766. //
  767. BOOL COutputQueue::IsIdle()
  768. {
  769.     CAutoLock lck(this);
  770.  
  771.     //  We're idle if
  772.     //      there is no thread (!IsQueued()) OR
  773.     //      the thread is waiting for more work  (m_lWaiting != 0)
  774.     //  AND
  775.     //      there's nothing in the current batch (m_nBatched == 0)
  776.  
  777.     if (IsQueued() && m_lWaiting == 0 || m_nBatched != 0) {
  778.         return FALSE;
  779.     } else {
  780.  
  781.         //  If we're idle it shouldn't be possible for there
  782.         //  to be anything on the work queue
  783.  
  784.         ASSERT(!IsQueued() || m_List->GetCount() == 0);
  785.         return TRUE;
  786.     }
  787. }
  788.  
  789.  
  790. void COutputQueue::SetPopEvent(HANDLE hEvent)
  791. {
  792.     m_hEventPop = hEvent;
  793. }
  794.